################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Dict, Union, List, Optional

from pyflink.common.config_options import ConfigOption
from pyflink.java_gateway import get_gateway
from pyflink.table.schema import Schema
from pyflink.util.java_utils import to_jarray

__all__ = ['TableDescriptor', 'FormatDescriptor']


class TableDescriptor(object):
    """
    Describes a CatalogTable representing a source or sink.

    TableDescriptor is a template for creating a CatalogTable} instance. It closely resembles the
    "CREATE TABLE" SQL DDL statement, containing schema, connector options, and other
    characteristics. Since tables in Flink are typically backed by external systems, the
    descriptor describes how a connector (and possibly its format) are configured.

    This can be used to register a table in the Table API, see :func:`create_temporary_table` in
    TableEnvironment.
    """

    def __init__(self, j_table_descriptor):
        self._j_table_descriptor = j_table_descriptor

    @staticmethod
    def for_connector(connector: str) -> 'TableDescriptor.Builder':
        """
        Creates a new :class:`~pyflink.table.TableDescriptor.Builder` for a table using the given
        connector.

        :param connector: The factory identifier for the connector.
        """
        gateway = get_gateway()
        j_builder = gateway.jvm.TableDescriptor.forConnector(connector)
        return TableDescriptor.Builder(j_builder)

    def get_schema(self) -> Optional[Schema]:
        j_schema = self._j_table_descriptor.getSchema()
        if j_schema.isPresent():
            return Schema(j_schema.get())
        else:
            return None

    def get_options(self) -> Dict[str, str]:
        return self._j_table_descriptor.getOptions()

    def get_partition_keys(self) -> List[str]:
        return self._j_table_descriptor.getPartitionKeys()

    def get_comment(self) -> Optional[str]:
        j_comment = self._j_table_descriptor.getComment()
        if j_comment.isPresent():
            return j_comment.get()
        else:
            return None

    def __str__(self):
        return self._j_table_descriptor.toString()

    def __eq__(self, other):
        return (self.__class__ == other.__class__ and
                self._j_table_descriptor.equals(other._j_table_descriptor))

    def __hash__(self):
        return self._j_table_descriptor.hashCode()

    class Builder(object):
        """
        Builder for TableDescriptor.
        """

        def __init__(self, j_builder):
            self._j_builder = j_builder

        def schema(self, schema: Schema) -> 'TableDescriptor.Builder':
            """
            Define the schema of the TableDescriptor.
            """
            self._j_builder.schema(schema._j_schema)
            return self

        def option(self, key: Union[str, ConfigOption], value) -> 'TableDescriptor.Builder':
            """
            Sets the given option on the table.

            Option keys must be fully specified. When defining options for a Format, use
            format(FormatDescriptor) instead.

            Example:
            ::

                >>> TableDescriptor.for_connector("kafka") \
                ...     .option("scan.startup.mode", "latest-offset") \
                ...     .build()

            """
            if isinstance(key, str):
                self._j_builder.option(key, value)
            else:
                self._j_builder.option(key._j_config_option, value)
            return self

        def format(self,
                   format: Union[str, 'FormatDescriptor'],
                   format_option: ConfigOption[str] = None) -> 'TableDescriptor.Builder':
            """
            Defines the format to be used for this table.

            Note that not every connector requires a format to be specified, while others may use
            multiple formats.

            Example:
            ::

                >>> TableDescriptor.for_connector("kafka") \
                ...     .format(FormatDescriptor.for_format("json")
                ...                 .option("ignore-parse-errors", "true")
                ...                 .build())

                will result in the options:

                    'format' = 'json'
                    'json.ignore-parse-errors' = 'true'

            """
            if format_option is None:
                if isinstance(format, str):
                    self._j_builder.format(format)
                else:
                    self._j_builder.format(format._j_format_descriptor)
            else:
                if isinstance(format, str):
                    self._j_builder.format(format_option._j_config_option, format)
                else:
                    self._j_builder.format(
                        format_option._j_config_option, format._j_format_descriptor)
            return self

        def partitioned_by(self, *partition_keys: str) -> 'TableDescriptor.Builder':
            """
            Define which columns this table is partitioned by.
            """
            gateway = get_gateway()
            self._j_builder.partitionedBy(to_jarray(gateway.jvm.java.lang.String, partition_keys))
            return self

        def comment(self, comment: str) -> 'TableDescriptor.Builder':
            """
            Define the comment for this table.
            """
            self._j_builder.comment(comment)
            return self

        def build(self) -> 'TableDescriptor':
            """
            Returns an immutable instance of :class:`~pyflink.table.TableDescriptor`.
            """
            return TableDescriptor(self._j_builder.build())


class FormatDescriptor(object):
    """
    Describes a Format and its options for use with :class:`~pyflink.table.TableDescriptor`.

    Formats are responsible for encoding and decoding data in table connectors. Note that not
    every connector has a format, while others may have multiple formats (e.g. the Kafka connector
    has separate formats for keys and values). Common formats are "json", "csv", "avro", etc.
    """

    def __init__(self, j_format_descriptor):
        self._j_format_descriptor = j_format_descriptor

    @staticmethod
    def for_format(format: str) -> 'FormatDescriptor.Builder':
        """
        Creates a new :class:`~pyflink.table.FormatDescriptor.Builder` describing a format with the
        given format identifier.

        :param format: The factory identifier for the format.
        """
        gateway = get_gateway()
        j_builder = gateway.jvm.FormatDescriptor.forFormat(format)
        return FormatDescriptor.Builder(j_builder)

    def get_format(self) -> str:
        return self._j_format_descriptor.getFormat()

    def get_options(self) -> Dict[str, str]:
        return self._j_format_descriptor.getOptions()

    def __str__(self):
        return self._j_format_descriptor.toString()

    def __eq__(self, other):
        return (self.__class__ == other.__class__ and
                self._j_format_descriptor.equals(other._j_format_descriptor))

    def __hash__(self):
        return self._j_format_descriptor.hashCode()

    class Builder(object):
        """
        Builder for FormatDescriptor.
        """

        def __init__(self, j_builder):
            self._j_builder = j_builder

        def option(self, key: Union[str, ConfigOption], value) -> 'FormatDescriptor.Builder':
            """
            Sets the given option on the format.

            Note that format options must not be prefixed with the format identifier itself here.

            Example:
            ::

                >>> FormatDescriptor.for_format("json") \
                ...     .option("ignore-parse-errors", "true") \
                ...     .build()

                will automatically be converted into its prefixed form:

                    'format' = 'json'
                    'json.ignore-parse-errors' = 'true'

            """
            if isinstance(key, str):
                self._j_builder.option(key, value)
            else:
                self._j_builder.option(key._j_config_option, value)
            return self

        def build(self) -> 'FormatDescriptor':
            """
            Returns an immutable instance of :class:`~pyflink.table.FormatDescriptor`.
            """
            return FormatDescriptor(self._j_builder.build())
